package sync

import (
	"context"
	"fmt"
	"math"
	"time"

	"github.com/OffchainLabs/prysm/v7/beacon-chain/core/feed"
	"github.com/OffchainLabs/prysm/v7/beacon-chain/core/feed/operation"
	"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p"
	"github.com/OffchainLabs/prysm/v7/beacon-chain/verification"
	fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
	"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
	"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
	"github.com/OffchainLabs/prysm/v7/crypto/rand"
	"github.com/OffchainLabs/prysm/v7/encoding/bytesutil"
	eth "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
	"github.com/OffchainLabs/prysm/v7/runtime/logging"
	prysmTime "github.com/OffchainLabs/prysm/v7/time"
	"github.com/OffchainLabs/prysm/v7/time/slots"
	pubsub "github.com/libp2p/go-libp2p-pubsub"
	"github.com/libp2p/go-libp2p/core/peer"
	"github.com/pkg/errors"

	"github.com/sirupsen/logrus"
)

// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/p2p-interface.md#the-gossip-domain-gossipsub
func (s *Service) validateDataColumn(ctx context.Context, pid peer.ID, msg *pubsub.Message) (pubsub.ValidationResult, error) {
	const dataColumnSidecarSubTopic = "/data_column_sidecar_%d/"

	dataColumnSidecarVerificationRequestsCounter.Inc()
	receivedTime := prysmTime.Now()

	// Always accept messages our own messages.
	if pid == s.cfg.p2p.PeerID() {
		return pubsub.ValidationAccept, nil
	}

	// Ignore messages during initial sync.
	if s.cfg.initialSync.Syncing() {
		return pubsub.ValidationIgnore, nil
	}

	// Reject messages with a nil topic.
	if msg.Topic == nil {
		return pubsub.ValidationReject, p2p.ErrInvalidTopic
	}

	// Decode the message, reject if it fails.
	m, err := s.decodePubsubMessage(msg)
	if err != nil {
		log.WithError(err).Error("Failed to decode message")
		return pubsub.ValidationReject, err
	}

	// Reject messages that are not of the expected type.
	dcsc, ok := m.(*eth.DataColumnSidecar)
	if !ok {
		log.WithField("message", m).Error("Message is not of type *eth.DataColumnSidecar")
		return pubsub.ValidationReject, errWrongMessage
	}

	// Convert to a read-only data column sidecar.
	roDataColumn, err := blocks.NewRODataColumn(dcsc)
	if err != nil {
		return pubsub.ValidationReject, errors.Wrap(err, "roDataColumn conversion failure")
	}

	// Compute a batch of only one data column sidecar.
	roDataColumns := []blocks.RODataColumn{roDataColumn}

	// Create the verifier.
	verifier := s.newColumnsVerifier(roDataColumns, verification.GossipDataColumnSidecarRequirements)

	// Start the verification process.
	// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/p2p-interface.md#data_column_sidecar_subnet_id

	// [REJECT] The sidecar is valid as verified by `verify_data_column_sidecar(sidecar)`.
	if err := verifier.ValidFields(); err != nil {
		return pubsub.ValidationReject, err
	}

	// [REJECT] The sidecar is for the correct subnet -- i.e. `compute_subnet_for_data_column_sidecar(sidecar.index) == subnet_id`.
	if err := verifier.CorrectSubnet(dataColumnSidecarSubTopic, []string{*msg.Topic}); err != nil {
		return pubsub.ValidationReject, err
	}

	// [IGNORE] The sidecar is not from a future slot (with a `MAXIMUM_GOSSIP_CLOCK_DISPARITY` allowance)
	//  -- i.e. validate that `block_header.slot <= current_slot` (a client MAY queue future sidecars for processing at the appropriate slot).
	if err := verifier.NotFromFutureSlot(); err != nil {
		return pubsub.ValidationIgnore, err
	}

	// [IGNORE] The sidecar is from a slot greater than the latest finalized slot
	// -- i.e. validate that `block_header.slot > compute_start_slot_at_epoch(state.finalized_checkpoint.epoch)`
	if err := verifier.SlotAboveFinalized(); err != nil {
		return pubsub.ValidationIgnore, err
	}

	// [IGNORE] The sidecar's block's parent (defined by `block_header.parent_root`) has been seen (via gossip or non-gossip sources
	// (a client MAY queue sidecars for processing once the parent block is retrieved).
	if err := verifier.SidecarParentSeen(s.hasBadBlock); err != nil {
		// If we haven't seen the parent, request it asynchronously.
		go func() {
			customCtx := context.Background()
			parentRoot := roDataColumn.ParentRoot()
			roots := [][fieldparams.RootLength]byte{parentRoot}
			randGenerator := rand.NewGenerator()
			if err := s.sendBatchRootRequest(customCtx, roots, randGenerator); err != nil {
				log.WithError(err).WithFields(logging.DataColumnFields(roDataColumn)).Debug("Failed to send batch root request")
			}
		}()

		return pubsub.ValidationIgnore, err
	}

	// [REJECT] The sidecar's block's parent (defined by `block_header.parent_root`) passes validation.
	if err := verifier.SidecarParentValid(s.hasBadBlock); err != nil {
		return pubsub.ValidationReject, err
	}

	// [REJECT] The proposer signature of `sidecar.signed_block_header`, is valid with respect to the `block_header.proposer_index` pubkey.
	//          We do not strictly respect the spec ordering here. This is necessary because signature verification depends on the parent root,
	//          which is only available if the parent block is known.
	if err := verifier.ValidProposerSignature(ctx); err != nil {
		return pubsub.ValidationReject, err
	}

	// [REJECT] The sidecar is from a higher slot than the sidecar's block's parent (defined by `block_header.parent_root`).
	if err := verifier.SidecarParentSlotLower(); err != nil {
		return pubsub.ValidationReject, err
	}

	// [REJECT] The current `finalized_checkpoint` is an ancestor of the sidecar's block
	// -- i.e. `get_checkpoint_block(store, block_header.parent_root, store.finalized_checkpoint.epoch) == store.finalized_checkpoint.root`.
	if err := verifier.SidecarDescendsFromFinalized(); err != nil {
		return pubsub.ValidationReject, err
	}

	// [REJECT] The sidecar's `kzg_commitments` field inclusion proof is valid as verified by `verify_data_column_sidecar_inclusion_proof(sidecar)`.
	if err := verifier.SidecarInclusionProven(); err != nil {
		return pubsub.ValidationReject, err
	}

	// [REJECT] The sidecar's column data is valid as verified by `verify_data_column_sidecar_kzg_proofs(sidecar)`.
	validationResult, err := s.validateWithKzgBatchVerifier(ctx, roDataColumns)
	if validationResult != pubsub.ValidationAccept {
		return validationResult, err
	}
	// Mark KZG verification as satisfied since we did it via batch verifier
	verifier.SatisfyRequirement(verification.RequireSidecarKzgProofVerified)

	// [IGNORE] The sidecar is the first sidecar for the tuple `(block_header.slot, block_header.proposer_index, sidecar.index)`
	// with valid header signature, sidecar inclusion proof, and kzg proof.
	if s.hasSeenDataColumnIndex(roDataColumn.Slot(), roDataColumn.ProposerIndex(), roDataColumn.DataColumnSidecar.Index) {
		return pubsub.ValidationIgnore, nil
	}

	// [REJECT] The sidecar is proposed by the expected `proposer_index` for the block's slot in the context of the current shuffling (defined by `block_header.parent_root`/`block_header.slot`).
	// If the `proposer_index` cannot immediately be verified against the expected shuffling, the sidecar MAY be queued for later processing while proposers for the block's branch are calculated
	// -- in such a case do not REJECT, instead IGNORE this message.
	if err := verifier.SidecarProposerExpected(ctx); err != nil {
		return pubsub.ValidationReject, err
	}

	verifiedRODataColumns, err := verifier.VerifiedRODataColumns()
	if err != nil {
		// This should never happen.
		log.WithError(err).WithFields(logging.DataColumnFields(roDataColumn)).Error("Failed to get verified data columns")
		return pubsub.ValidationIgnore, err
	}

	verifiedRODataColumnsCount := len(verifiedRODataColumns)

	if verifiedRODataColumnsCount != 1 {
		// This should never happen.
		log.WithField("verifiedRODataColumnsCount", verifiedRODataColumnsCount).Error("Verified data columns count is not 1")
		return pubsub.ValidationIgnore, errors.New("Wrong number of verified data columns")
	}

	msg.ValidatorData = verifiedRODataColumns[0]
	dataColumnSidecarVerificationSuccessesCounter.Inc()

	// Get the time at slot start.
	startTime, err := slots.StartTime(s.cfg.clock.GenesisTime(), roDataColumn.SignedBlockHeader.Header.Slot)
	if err != nil {
		return pubsub.ValidationIgnore, err
	}

	sinceSlotStartTime := receivedTime.Sub(startTime)
	validationTime := s.cfg.clock.Now().Sub(receivedTime)
	dataColumnSidecarArrivalGossipSummary.Observe(float64(sinceSlotStartTime.Milliseconds()))
	dataColumnSidecarVerificationGossipHistogram.Observe(float64(validationTime.Milliseconds()))

	peerGossipScore := s.cfg.p2p.Peers().Scorers().GossipScorer().Score(pid)

	select {
	case s.dataColumnLogCh <- dataColumnLogEntry{
		Slot:            roDataColumn.Slot(),
		ColIdx:          roDataColumn.Index,
		PropIdx:         roDataColumn.ProposerIndex(),
		BlockRoot:       roDataColumn.BlockRoot(),
		ParentRoot:      roDataColumn.ParentRoot(),
		PeerSuffix:      pid.String()[len(pid.String())-6:],
		PeerGossipScore: peerGossipScore,
		validationTime:  validationTime,
		sinceStartTime:  sinceSlotStartTime,
	}:
	default:
		log.WithField("slot", roDataColumn.Slot()).Warn("Failed to send data column log entry")
	}

	if s.cfg.operationNotifier != nil {
		s.cfg.operationNotifier.OperationFeed().Send(&feed.Event{
			Type: operation.DataColumnReceived,
			Data: &operation.DataColumnReceivedData{
				Slot:           roDataColumn.Slot(),
				Index:          roDataColumn.Index,
				BlockRoot:      roDataColumn.BlockRoot(),
				KzgCommitments: bytesutil.SafeCopy2dBytes(roDataColumn.KzgCommitments),
			},
		})
	}

	return pubsub.ValidationAccept, nil
}

// Returns true if the column with the same slot, proposer index, and column index has been seen before.
func (s *Service) hasSeenDataColumnIndex(slot primitives.Slot, proposerIndex primitives.ValidatorIndex, index uint64) bool {
	key := computeCacheKey(slot, proposerIndex, index)
	_, seen := s.seenDataColumnCache.Get(key)
	return seen
}

// Sets the data column with the same slot, proposer index, and data column index as seen.
func (s *Service) setSeenDataColumnIndex(slot primitives.Slot, proposerIndex primitives.ValidatorIndex, index uint64) {
	key := computeCacheKey(slot, proposerIndex, index)
	s.seenDataColumnCache.Add(slot, key, true)
}

func computeCacheKey(slot primitives.Slot, proposerIndex primitives.ValidatorIndex, index uint64) string {
	key := make([]byte, 0, 96)

	key = append(key, bytesutil.Bytes32(uint64(slot))...)
	key = append(key, bytesutil.Bytes32(uint64(proposerIndex))...)
	key = append(key, bytesutil.Bytes32(index)...)

	return string(key)
}

type dataColumnLogEntry struct {
	Slot            primitives.Slot
	ColIdx          uint64
	PropIdx         primitives.ValidatorIndex
	BlockRoot       [32]byte
	ParentRoot      [32]byte
	PeerSuffix      string
	PeerGossipScore float64
	validationTime  time.Duration
	sinceStartTime  time.Duration
}

func (s *Service) processDataColumnLogs() {
	ticker := time.NewTicker(1 * time.Second)
	defer ticker.Stop()

	slotStats := make(map[primitives.Slot][fieldparams.NumberOfColumns]dataColumnLogEntry)

	for {
		select {
		case entry := <-s.dataColumnLogCh:
			cols := slotStats[entry.Slot]
			cols[entry.ColIdx] = entry
			slotStats[entry.Slot] = cols
		case <-ticker.C:
			for slot, columns := range slotStats {
				var (
					colIndices      = make([]uint64, 0, fieldparams.NumberOfColumns)
					peers           = make([]string, 0, fieldparams.NumberOfColumns)
					gossipScores    = make([]float64, 0, fieldparams.NumberOfColumns)
					validationTimes = make([]string, 0, fieldparams.NumberOfColumns)
					sinceStartTimes = make([]string, 0, fieldparams.NumberOfColumns)
				)

				totalReceived := 0
				for _, entry := range columns {
					if entry.PeerSuffix == "" {
						continue
					}
					colIndices = append(colIndices, entry.ColIdx)
					peers = append(peers, entry.PeerSuffix)
					gossipScores = append(gossipScores, roundFloat(entry.PeerGossipScore, 2))
					validationTimes = append(validationTimes, fmt.Sprintf("%.2fms", float64(entry.validationTime.Milliseconds())))
					sinceStartTimes = append(sinceStartTimes, fmt.Sprintf("%.2fms", float64(entry.sinceStartTime.Milliseconds())))
					totalReceived++
				}

				log.WithFields(logrus.Fields{
					"slot":            slot,
					"receivedCount":   totalReceived,
					"columnIndices":   colIndices,
					"peers":           peers,
					"gossipScores":    gossipScores,
					"validationTimes": validationTimes,
					"sinceStartTimes": sinceStartTimes,
				}).Debug("Accepted data column sidecars summary")
			}
			slotStats = make(map[primitives.Slot][fieldparams.NumberOfColumns]dataColumnLogEntry)
		}
	}
}

func roundFloat(f float64, decimals int) float64 {
	mult := math.Pow(10, float64(decimals))
	return math.Round(f*mult) / mult
}
